package actor

import (
	"fmt"
	"net"
	"net/http"
	nws "nggs/network/websocket/v2"
	"sync"
	"sync/atomic"
	"time"

	"github.com/AsynkronIT/protoactor-go/actor"
	"github.com/AsynkronIT/protoactor-go/mailbox"
	"github.com/gin-gonic/gin"

	nevent "nggs/event"
	nexport "nggs/export"
	nlog "nggs/log"
	npb "nggs/network/protocol/protobuf/v3"
	nrpc "nggs/rpc"
	nutil "nggs/util"
)

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
type VFOnStarted func(ctx actor.Context)
type VFOnStopping func(ctx actor.Context)
type VFOnStopped func(ctx actor.Context)
type VFOnReceiveMessage func(ctx actor.Context)
type VFOnActorTerminated func(who *actor.PID, ctx actor.Context)
type VFOnRestarting func(ctx actor.Context)
type VFOnRestarted func(ctx actor.Context)
type VFBeforeTriggerTimer func(id TimerID, tag TimerTag, ctx actor.Context) (ok bool, err error)
type VFAfterTriggerTimer func(err error, id TimerID, tag TimerTag, ctx actor.Context)
type VFOnPulse func(ctx actor.Context)
type VFBeforeDispatchEvent func(inEvent nevent.IEvent, ctx actor.Context) (args []interface{}, ok bool, err error)
type VFAfterDispatchEvent func(err error, iEvent nevent.IEvent, ctx actor.Context, args ...interface{})
type VFBeforeDispatchRPC func(sender *actor.PID, iRequestMessage nrpc.IMessage, ctx actor.Context) (args []interface{}, ok bool, err error)
type VFAfterDispatchRPC func(err error, sender *actor.PID, iRequestMessage nrpc.IMessage, iResponseMessage nrpc.IMessage, ctx actor.Context, args ...interface{})
type VFBeforeCompleteHttpTask func(id HttpTaskID, tag HttpTaskTag, ctx actor.Context) (ok bool, err error)
type VFAfterCompleteHttpTask func(err error, id HttpTaskID, tag HttpTaskTag, ctx actor.Context)
type VFBeforeRunHttpServer func(engine *gin.Engine)
type VFAfterRunHttpServer func(err error, engine *gin.Engine)
type VFBeforeDispatchHttpRequest func(ctx *gin.Context) (ok bool, err error)
type VFAfterDispatchHttpRequest func(err error, ctx *gin.Context)
type VFBeforeDispatchMessage func(iProtocol npb.IProtocol, iRecv npb.IMessage, ctx actor.Context) (iSend npb.IMessage, args []interface{}, ok bool, err error)
type VFAfterDispatchMessage func(err error, iRecv npb.IMessage, iSend npb.IMessage, ctx actor.Context, args ...interface{})
type VFAfterRunWebsocketServer func(err error, server *nws.Server)

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
type option struct {
	vfOnStarted                 VFOnStarted
	vfOnStopping                VFOnStopping
	vfOnStopped                 VFOnStopped
	vfOnReceiveMessage          VFOnReceiveMessage
	vfOnActorTerminated         VFOnActorTerminated
	vfOnRestarting              VFOnRestarting
	vfOnRestarted               VFOnRestarted
	vfBeforeTriggerTimer        VFBeforeTriggerTimer
	vfAfterTriggerTimer         VFAfterTriggerTimer
	vfOnPulse                   VFOnPulse
	vfBeforeDispatchEvent       VFBeforeDispatchEvent
	vfAfterDispatchEvent        VFAfterDispatchEvent
	vfBeforeDispatchRPC         VFBeforeDispatchRPC
	vfAfterDispatchRPC          VFAfterDispatchRPC
	vfBeforeCompleteHttpTask    VFBeforeCompleteHttpTask
	vfAfterCompleteHttpTask     VFAfterCompleteHttpTask
	vfBeforeRunHttpServer       VFBeforeRunHttpServer
	vfAfterRunHttpServer        VFAfterRunHttpServer
	vfBeforeDispatchHttpRequest VFBeforeDispatchHttpRequest
	vfAfterDispatchHttpRequest  VFAfterDispatchHttpRequest
	vfBeforeDispatchMessage     VFBeforeDispatchMessage
	vfAfterDispatchMessage      VFAfterDispatchMessage
	vfAfterRunWebsocketServer   VFAfterRunWebsocketServer

	logger nlog.ILogger

	startedWg *sync.WaitGroup
	stoppedWg *sync.WaitGroup

	mailBoxSize int

	decider actor.DeciderFunc

	rpcProtocol nrpc.IProtocol

	pulseInterval time.Duration
	pulseTimerID  TimerID

	httpServerListenAddr string
	httpServerCertFile   string
	httpServerKeyFile    string

	websocketServerConfig              nws.Config
	websocketServerListenAddr          string
	websocketServerCertFile            string
	websocketServerKeyFile             string
	websocketSessionHandlerProduceFunc nws.SessionHandlerProduceFunc

	msgProtocol npb.IProtocol

	logics map[nexport.LogicID]nexport.ILogic
}

type OptionFunc func(option *option)

func newOption() option {
	return option{
		vfOnReceiveMessage:  func(ctx actor.Context) {},
		vfOnStarted:         func(ctx actor.Context) {},
		vfOnStopping:        func(ctx actor.Context) {},
		vfOnStopped:         func(ctx actor.Context) {},
		vfOnActorTerminated: func(who *actor.PID, ctx actor.Context) {},
		vfOnRestarting:      func(ctx actor.Context) {},
		vfOnRestarted:       func(ctx actor.Context) {},
		decider:             DefaultDecider,
		vfBeforeTriggerTimer: func(id TimerID, tag TimerTag, ctx actor.Context) (ok bool, err error) {
			ok = true
			return
		},
		vfAfterTriggerTimer: func(err error, id TimerID, tag TimerTag, ctx actor.Context) {},
		vfOnPulse:           func(ctx actor.Context) {},
		vfBeforeDispatchEvent: func(iEvent nevent.IEvent, ctx actor.Context) (args []interface{}, ok bool, err error) {
			ok = true
			return
		},
		vfAfterDispatchEvent: func(err error, iEvent nevent.IEvent, ctx actor.Context, args ...interface{}) {},
		vfBeforeDispatchRPC: func(sender *actor.PID, iRequestMessage nrpc.IMessage, ctx actor.Context) (args []interface{}, ok bool, err error) {
			ok = true
			return
		},
		vfAfterDispatchRPC: func(err error, sender *actor.PID, iRequestMessage nrpc.IMessage, iResponseMessage nrpc.IMessage, ctx actor.Context, args ...interface{}) {
		},
		vfBeforeCompleteHttpTask: func(id HttpTaskID, tag HttpTaskTag, ctx actor.Context) (ok bool, err error) {
			ok = true
			return
		},
		vfAfterCompleteHttpTask: func(err error, id HttpTaskID, tag HttpTaskTag, ctx actor.Context) {},
		vfBeforeRunHttpServer:   func(engine *gin.Engine) {},
		vfAfterRunHttpServer:    func(err error, engine *gin.Engine) {},
		vfBeforeDispatchHttpRequest: func(ctx *gin.Context) (ok bool, err error) {
			return true, nil
		},
		vfAfterDispatchHttpRequest: func(err error, ctx *gin.Context) {},
		vfBeforeDispatchMessage: func(iProtocol npb.IProtocol, iRecv npb.IMessage, ctx actor.Context) (iSend npb.IMessage, args []interface{}, ok bool, err error) {
			ok = true
			return
		},
		vfAfterDispatchMessage:    func(err error, iRecv npb.IMessage, iSend npb.IMessage, ctx actor.Context, args ...interface{}) {},
		vfAfterRunWebsocketServer: func(err error, server *nws.Server) {},
		websocketServerConfig:     nws.DefaultConfig,
		websocketSessionHandlerProduceFunc: func(r *http.Request) nws.ISessionHandler {
			return nil
		},
	}
}

func WithOnReceiveMessage(vf VFOnReceiveMessage) OptionFunc {
	return func(option *option) {
		if vf != nil {
			option.vfOnReceiveMessage = vf
		}
	}
}

func WithOnStarted(vf VFOnStarted) OptionFunc {
	return func(option *option) {
		if vf != nil {
			option.vfOnStarted = vf
		}
	}
}

func WithOnStopping(vf VFOnStopping) OptionFunc {
	return func(option *option) {
		if vf != nil {
			option.vfOnStopping = vf
		}
	}
}

func WithOnStopped(vf VFOnStopped) OptionFunc {
	return func(option *option) {
		if vf != nil {
			option.vfOnStopped = vf
		}
	}
}

func WithOnActorTerminate(vf VFOnActorTerminated) OptionFunc {
	return func(option *option) {
		if vf != nil {
			option.vfOnActorTerminated = vf
		}
	}
}

func WithOnRestarting(vf VFOnRestarting) OptionFunc {
	return func(option *option) {
		if vf != nil {
			option.vfOnRestarting = vf
		}
	}
}

func WithOnRestarted(vf VFOnRestarted) OptionFunc {
	return func(option *option) {
		if vf != nil {
			option.vfOnRestarted = vf
		}
	}
}

func WithLogger(logger nlog.ILogger) OptionFunc {
	return func(option *option) {
		if logger != nil {
			option.logger = logger
		}
	}
}

func WithStartedWaitGroup(startedWg *sync.WaitGroup) OptionFunc {
	return func(option *option) {
		option.startedWg = startedWg
	}
}

func WithStoppedWaitGroup(stoppedWg *sync.WaitGroup) OptionFunc {
	return func(option *option) {
		option.stoppedWg = stoppedWg
	}
}

func WithMailBoxSize(mailBoxSize int) OptionFunc {
	return func(option *option) {
		if mailBoxSize > 0 {
			option.mailBoxSize = mailBoxSize
		}
	}
}

func WithDecider(decider actor.DeciderFunc) OptionFunc {
	return func(option *option) {
		if decider != nil {
			option.decider = decider
		}
	}
}

func WithTimer(before VFBeforeTriggerTimer, after VFAfterTriggerTimer) OptionFunc {
	return func(option *option) {
		if before != nil {
			option.vfBeforeTriggerTimer = before
		}
		if after != nil {
			option.vfAfterTriggerTimer = after
		}
	}
}

func WithPulse(interval time.Duration, vf VFOnPulse) OptionFunc {
	return func(option *option) {
		if interval > 0 {
			option.pulseInterval = interval
		}
		if vf != nil {
			option.vfOnPulse = vf
		}
	}
}

func WithEvent(before VFBeforeDispatchEvent, after VFAfterDispatchEvent) OptionFunc {
	return func(option *option) {
		if before != nil {
			option.vfBeforeDispatchEvent = before
		}
		if after != nil {
			option.vfAfterDispatchEvent = after
		}
	}
}

func WithRPC(iProtocol nrpc.IProtocol, before VFBeforeDispatchRPC, after VFAfterDispatchRPC) OptionFunc {
	return func(option *option) {
		if iProtocol != nil {
			option.rpcProtocol = iProtocol
		}
		if before != nil {
			option.vfBeforeDispatchRPC = before
		}
		if after != nil {
			option.vfAfterDispatchRPC = after
		}
	}
}

func WithHttpTask(before VFBeforeCompleteHttpTask, after VFAfterCompleteHttpTask) OptionFunc {
	return func(option *option) {
		if before != nil {
			option.vfBeforeCompleteHttpTask = before
		}
		if after != nil {
			option.vfAfterCompleteHttpTask = after
		}
	}
}

func WithHttpServer(listenAddr string, certFile string, keyFile string, beforeRun VFBeforeRunHttpServer, afterRun VFAfterRunHttpServer,
	beforeDispatch VFBeforeDispatchHttpRequest, afterDispatch VFAfterDispatchHttpRequest) OptionFunc {
	return func(option *option) {
		if listenAddr == "" {
			return
		}
		option.httpServerListenAddr = listenAddr
		if certFile != "" && keyFile != "" {
			option.httpServerCertFile = certFile
			option.httpServerKeyFile = keyFile
		}
		if beforeRun != nil {
			option.vfBeforeRunHttpServer = beforeRun
		}
		if afterRun != nil {
			option.vfAfterRunHttpServer = afterRun
		}
		if beforeDispatch != nil {
			option.vfBeforeDispatchHttpRequest = beforeDispatch
		}
		if afterDispatch != nil {
			option.vfAfterDispatchHttpRequest = afterDispatch
		}
	}
}

func WithWebsocketServer(listenAddr string, certFile string, keyFile string, config *nws.Config, sessionHandlerProduceFunc nws.SessionHandlerProduceFunc, afterRun VFAfterRunWebsocketServer) OptionFunc {
	return func(option *option) {
		if listenAddr == "" {
			return
		}
		option.websocketServerListenAddr = listenAddr
		if certFile != "" && keyFile != "" {
			option.websocketServerCertFile = certFile
			option.websocketServerKeyFile = keyFile
		}
		if config != nil {
			option.websocketServerConfig = *config
		}
		if sessionHandlerProduceFunc != nil {
			option.websocketSessionHandlerProduceFunc = sessionHandlerProduceFunc
		}
		if afterRun != nil {
			option.vfAfterRunWebsocketServer = afterRun
		}
	}
}

func WithMessage(iProtocol npb.IProtocol, before VFBeforeDispatchMessage, after VFAfterDispatchMessage) OptionFunc {
	return func(option *option) {
		if iProtocol != nil {
			option.msgProtocol = iProtocol
		}
		if before != nil {
			option.vfBeforeDispatchMessage = before
		}
		if after != nil {
			option.vfAfterDispatchMessage = after
		}
	}
}

func WithLogics(logics map[nexport.LogicID]nexport.ILogic) OptionFunc {
	return func(option *option) {
		if len(logics) > 0 {
			option.logics = logics
		}
	}
}

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
type Actor struct {
	option

	sign string

	pid       *actor.PID
	parentPID *actor.PID

	stopFlag  int32
	isRestart bool

	timerMgr *TimerManager

	pulseTimerID TimerID

	eventDispatcher *nevent.Dispatcher

	rpcDispatcher *nrpc.MessageDispatcher

	promiseSeqGenerator PromiseSeq
	promises            map[PromiseSeq]*Promise

	msgDispatcher npb.IMessageDispatcher

	httpTaskMgr *HttpTaskManager

	httpServer *httpServer

	websocketServer *nws.Server
}

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
func New(optionFuncs ...OptionFunc) *Actor {
	a := &Actor{
		option:              newOption(),
		eventDispatcher:     nevent.NewDispatcher(),
		rpcDispatcher:       nrpc.NewMessageDispatcher(),
		promiseSeqGenerator: 0,
		promises:            map[PromiseSeq]*Promise{},
		msgDispatcher:       npb.NewMessageDispatcher(),
	}

	a.httpServer = newHttpServer(a)

	a.websocketServer = nws.NewServer(a.websocketServerConfig)

	for _, optionFunc := range optionFuncs {
		optionFunc(&a.option)
	}

	if a.logger == nil {
		a.logger = gLogger
	}

	if a.startedWg != nil {
		a.startedWg.Add(1)
	}

	return a
}

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
func (a Actor) PID() *actor.PID {
	return a.pid.Clone()
}

func (a Actor) ParentPID() *actor.PID {
	return a.parentPID.Clone()
}

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
func (a Actor) Sign() string {
	return a.sign
}

func (a *Actor) SetSign(sign string) {
	a.sign = sign
}

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
func (a Actor) Logger() nlog.ILogger {
	return a.logger
}

//func (a *Actor) SetLogger(logger nlog.ILogger) {
//	if logger != nil {
//		a.logger = logger
//	}
//}

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
func (a *Actor) Start(ctx actor.Context, name string) (err error) {
	props := actor.PropsFromProducer(func() actor.Actor { return a })

	if a.mailBoxSize > 0 {
		props = props.WithMailbox(mailbox.Bounded(a.mailBoxSize))
	}

	if a.decider != nil {
		props = props.WithSupervisor(actor.NewOneForOneStrategy(0, 0, a.decider))
	}

	if ctx != nil {
		pid, e := ctx.SpawnNamed(props, name)
		if e != nil {
			err = fmt.Errorf("ctx.SpawnNamed fail, %w", e)
			return
		}
		a.pid = pid.Clone()
	} else {
		pid, e := RootContext.SpawnNamed(props, name)
		if e != nil {
			err = fmt.Errorf("RootContext.SpawnNamed fail, %w", e)
			return e
		}
		a.pid = pid.Clone()
	}

	a.sign = a.pid.Id

	if a.httpServerListenAddr != "" {
		a.vfBeforeRunHttpServer(a.httpServer.serverMux)
		err = a.httpServer.Listen(a.httpServerListenAddr)
		a.vfAfterRunHttpServer(err, a.httpServer.serverMux)
		if err != nil {
			a.Error("http server listen fail, addr=%s, %s", a.httpServerListenAddr, err)
			return
		}
		if a.httpServerCertFile != "" && a.httpServerKeyFile != "" {
			if e := nutil.IsDirOrFileExist(a.httpServerCertFile); e != nil {
				err = fmt.Errorf("http server cert file not exist, cert file=%s, %s", a.httpServerCertFile, e)
				return
			}
			if e := nutil.IsDirOrFileExist(a.httpServerKeyFile); e != nil {
				err = fmt.Errorf("http server key file not exist, key file=%s, %s", a.httpServerKeyFile, e)
				return
			}
			go a.httpServer.ServeTLS(a.httpServerCertFile, a.httpServerKeyFile)
		} else {
			go a.httpServer.Serve()
		}

	}

	if a.websocketServerListenAddr != "" {
		a.websocketServer.HandleProduceSessionHandler(a.websocketSessionHandlerProduceFunc)
		if a.websocketServerCertFile != "" && a.websocketServerKeyFile != "" {
			err = a.websocketServer.ServeTLS(a.websocketServerListenAddr, a.websocketServerCertFile, a.websocketServerKeyFile)
		} else {
			err = a.websocketServer.Serve(a.websocketServerListenAddr)
		}
		a.vfAfterRunWebsocketServer(err, a.websocketServer)
		if err != nil {
			a.Error("start websocket server fail, %s", err)
			return
		}
	}

	return
}

//func (a *Actor) StartWithPrefix(ctx actor.Context, prefix string) (err error) {
//	props := actor.PropsFromProducer(func() actor.Actor { return a })
//
//	if a.mailBoxSize > 0 {
//		props = props.WithMailbox(mailbox.Bounded(a.mailBoxSize))
//	}
//
//	if a.decider != nil {
//		props = props.WithSupervisor(actor.NewOneForOneStrategy(0, 0, a.decider))
//	}
//
//	if ctx != nil {
//		a.pid = ctx.SpawnPrefix(props, prefix).Clone()
//	} else {
//		a.pid = RootContext.SpawnPrefix(props, prefix).Clone()
//	}
//
//	a.sign = a.pid.Id
//
//	return
//}

func (a *Actor) WaitForStarted() {
	if a.startedWg != nil {
		a.startedWg.Wait()
	}
}

func (a Actor) IsStopping() bool {
	return atomic.LoadInt32(&a.stopFlag) == 1
}

func (a *Actor) Stop() error {
	if !atomic.CompareAndSwapInt32(&a.stopFlag, 0, 1) {
		return ErrAlreadyStop
	}
	if a.pid != nil {
		RootContext.Stop(a.pid)
	}
	return nil
}

func (a *Actor) WaitForStopped() {
	if a.stoppedWg != nil {
		a.stoppedWg.Wait()
	}
}

func (a *Actor) StopAndWait() error {
	if !atomic.CompareAndSwapInt32(&a.stopFlag, 0, 1) {
		return ErrAlreadyStop
	}
	if a.pid != nil {
		return RootContext.StopFuture(a.pid).Wait()
	}
	return nil
}

func (a *Actor) Poison() error {
	if !atomic.CompareAndSwapInt32(&a.stopFlag, 0, 1) {
		return ErrAlreadyStop
	}
	if a.pid != nil {
		RootContext.Poison(a.pid)
	}
	return nil
}

func (a *Actor) PoisonAndWait() error {
	if !atomic.CompareAndSwapInt32(&a.stopFlag, 0, 1) {
		return ErrAlreadyStop
	}
	if a.pid != nil {
		return RootContext.PoisonFuture(a.pid).Wait()
	}
	return nil
}

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
func (a *Actor) Debug(format string, args ...interface{}) {
	a.logger.Debug("[%s] %s", a.sign, fmt.Sprintf(format, args...))
}

func (a *Actor) Info(format string, args ...interface{}) {
	a.logger.Info("[%s] %s", a.sign, fmt.Sprintf(format, args...))
}

func (a *Actor) Warn(format string, args ...interface{}) {
	a.logger.Warn("[%s] %s", a.sign, fmt.Sprintf(format, args...))
}

func (a *Actor) Error(format string, args ...interface{}) {
	a.logger.Error("[%s] %s", a.sign, fmt.Sprintf(format, args...))
}

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
func (a *Actor) NewTimer(dur time.Duration, tag TimerTag, cb TimerCallback) TimerID {
	return a.timerMgr.NewTimerWithTag(dur, tag, cb)
}

func (a *Actor) NewLoopTimer(dur time.Duration, tag TimerID, cb TimerCallback) TimerID {
	return a.timerMgr.NewLoopTimerWithTag(dur, tag, cb)
}

func (a *Actor) StopTimer(id TimerID) error {
	return a.timerMgr.Stop(id)
}

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////\
func (a *Actor) pulse(timerID TimerID, timerTag TimerTag) {
	a.removeTimeoutPromise()
}

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
func (a *Actor) RegisterEventHandler(id nevent.ID, h nevent.Handler) {
	a.eventDispatcher.Register(id, h)
}

func (a *Actor) DispatchEvent(iEvent nevent.IEvent, ctx actor.Context, args ...interface{}) (err error) {
	return a.eventDispatcher.Dispatch(iEvent, ctx, args...)
}

func (a *Actor) PostEvent(iEvent nevent.IEvent) {
	RootContext.Send(a.PID(), iEvent)
}

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
func (a *Actor) NewHttpTask(timeout time.Duration, tag HttpTaskTag, prepare OnHttpTaskPrepare, callback OnHttpTaskCompleted) (taskID HttpTaskID, err error) {
	return a.httpTaskMgr.NewTask(timeout, tag, prepare, callback)
}

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
func (a *Actor) RegisterRPCHandler(id nrpc.MessageID, handler nrpc.MessageHandler) (err error) {
	if a.rpcProtocol == nil {
		err = fmt.Errorf("rpc protocol is nil")
		return
	}

	if e := a.rpcDispatcher.Register(id, handler); e != nil {
		err = fmt.Errorf("register rpc handler fail, id=%d, %s", id, e)
		return
	}

	return
}

func (a *Actor) NewPromise() (promise *Promise) {
	promise = &Promise{
		PromiseContext: &PromiseContext{
			actor: a,
		},
		beginTime:       time.Now(),
		timeoutDuration: defaultPromiseTimeoutDuration,
	}
	return
}

func (a *Actor) NewRPCPromise(iRequestMessage nrpc.IMessage, receiver *actor.PID, cb PromiseCallback) (promise *Promise) {
	promise = &Promise{
		PromiseContext: &PromiseContext{
			actor:           a,
			iRequestMessage: iRequestMessage,
			receiver:        receiver,
		},
		beginTime:       time.Now(),
		timeoutDuration: defaultPromiseTimeoutDuration,
		callbacks:       []PromiseCallback{cb},
	}
	return
}

func (a *Actor) addPromise(promise *Promise) {
	promise.seq = a.nextPromiseSeq()
	a.promises[promise.seq] = promise
}

func (a Actor) PromiseNum() int {
	return len(a.promises)
}

func (a *Actor) nextPromiseSeq() PromiseSeq {
	a.promiseSeqGenerator += 1
	return a.promiseSeqGenerator
}

func (a *Actor) removeTimeoutPromise() {
	var now = time.Now()
	var needRemoveSeqs = map[PromiseSeq]struct{}{}

	for _, promise := range a.promises {
		if promise.completed {
			needRemoveSeqs[promise.seq] = struct{}{}
			continue
		}
		if !promise.timeout.Before(now) {
			continue
		}
		needRemoveSeqs[promise.seq] = struct{}{}
		promise.onComplete(&ErrPromiseTimeout{
			Seq:              promise.seq,
			RequestMessageID: promise.iRequestMessage.MessageID(),
		})
	}
	for seq, _ := range needRemoveSeqs {
		delete(a.promises, seq)
	}
}

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
func (a *Actor) RegisterMessageHandler(id npb.MessageID, handler npb.MessageHandler) (err error) {
	if a.msgProtocol == nil {
		err = fmt.Errorf("msg protocol is nil")
		return
	}

	if e := a.msgDispatcher.Register(id, handler); e != nil {
		err = fmt.Errorf("register message handler fail, id=%d, %s", id, e)
		return
	}

	return
}

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
func (a *Actor) RegisterHttpRequestHandler(method string, pattern string, fn gin.HandlerFunc, option *HttpRequestHandlerOption) (err error) {
	if a.httpServerListenAddr == "" {
		err = fmt.Errorf("register htpp request handler fail, http server not run")
		return
	}

	if e := a.httpServer.Register(method, pattern, fn, option); e != nil {
		err = fmt.Errorf("register http request handler fail, pattern=%s, %s", pattern, e)
		return
	}

	return
}

func (a Actor) HttpServerListener() net.Listener {
	return a.httpServer.listener
}

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
func (a Actor) WebsocketServerSessionNum() int {
	if a.websocketServer == nil {
		return 0
	}
	return a.websocketServer.SessionNum()
}

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
func (a *Actor) GetLogic(id nexport.LogicID) nexport.ILogic {
	if len(a.logics) == 0 {
		return nil
	}
	return a.logics[id]
}

func (a *Actor) EachLogic(fn func(id nexport.LogicID, logic nexport.ILogic) (continued bool)) {
	if fn == nil {
		return
	}
	for id, logic := range a.logics {
		if !fn(id, logic) {
			break
		}
	}
}

func (a *Actor) initLogics() (err error) {
	for id, logic := range a.logics {
		err = logic.Init()
		if err != nil {
			a.Error("id=%d, %s", id, err)
			return
		}
	}
	return
}

func (a *Actor) runLogics() {
	for _, logic := range a.logics {
		logic.Run()
	}
}

func (a *Actor) finishLogics() {
	for _, logic := range a.logics {
		logic.Finish()
	}
}

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
func (a *Actor) Receive(ctx actor.Context) {
	sender := ctx.Sender()
	switch msg := ctx.Message().(type) {
	case *timeout:
		ok, err := a.vfBeforeTriggerTimer(msg.id, msg.tag, ctx)
		if !ok {
			return
		}
		if err != nil {
			a.Error("execute vfBeforeTriggerTimer fail, id=%d, tag=%d, %s", msg.id, msg.tag, err)
			return
		}

		err = a.timerMgr.Trigger(msg.id)
		if err != nil {
			a.Error("trigger timer fail, id=%d, tag=%d, %s", msg.id, msg.tag, err)
		} else {
			if msg.id == a.pulseTimerID {
				a.vfOnPulse(ctx)
				for _, logic := range a.logics {
					logic.OnPulse(ctx)
				}
			}
		}
		a.vfAfterTriggerTimer(err, msg.id, msg.tag, ctx)

	case nevent.IEvent:
		args, ok, err := a.vfBeforeDispatchEvent(msg, ctx)
		if !ok {
			return
		}
		if err != nil {
			a.Error("execute vfBeforeDispatchEvent fail, id=%d, %s", msg.EventID(), err)
			return
		}

		err = a.eventDispatcher.Dispatch(msg, ctx, args...)
		if err != nil {
			a.Warn("dispatch event fail, id=%d, %s", msg.EventID(), err)
		}

		a.vfAfterDispatchEvent(err, msg, ctx, args...)

	case *C2S_Promise:
		if a.rpcProtocol == nil {
			a.Error("recv *C2S_Promise but rpc protocol is nil, seq=%d, request message id=%d", msg.Seq, msg.ReqMsgID)
			return
		}

		if a.pulseInterval <= 0 {
			a.Error("recv *C2S_Promise but pulse interval <= 0, seq=%d, request message id=%d", msg.Seq, msg.ReqMsgID)
			return
		}

		if sender == nil {
			a.Error("sender is nil where recv *C2S_Promise, seq=%d, request message id=%d", msg.Seq, msg.ReqMsgID)
			return
		}

		iRequestMessage, e := a.rpcProtocol.Produce(msg.ReqMsgID)
		if e != nil {
			a.Error("produce promise request message fail, seq=%d, request message id=%d, %s", msg.Seq, msg.ReqMsgID, e)
			return
		}
		// todo 这里无法回收对象，因为业务层会浅拷贝来自rpc包的对象，如果回收对象，会导致各种问题
		//defer a.rpcProtocol.Recycle(iRequestMessage)

		// 反序列化收到的请求包
		e = iRequestMessage.Unmarshal(msg.ReqMsgData)
		if e != nil {
			a.Error("unmarshal promise request message fail, seq=%d, request message id=%d, %s", msg.Seq, msg.ReqMsgID, e)
			return
		}

		args, ok, err := a.vfBeforeDispatchRPC(sender, iRequestMessage, ctx)
		if !ok {
			return
		}
		if err != nil {
			a.Error("execute vfBeforeDispatchRPC in promise fail, seq=%d, sender=%s, request message id=%d, %s", msg.Seq, sender, msg.ReqMsgID, err)
			return
		}

		iResponseMessage, err := a.rpcDispatcher.Dispatch(a.rpcProtocol, nil /*sender*/, iRequestMessage, ctx, args...)
		if err != nil {
			a.Error("dispatch rpc message in promise fail, seq=%d, request message id=%d, %s", msg.Seq, msg.ReqMsgID, err)
		}

		a.vfAfterDispatchRPC(err, sender, iRequestMessage, iResponseMessage, ctx, args...)

		send := Get_S2C_Promise()
		send.Seq = msg.Seq
		if iResponseMessage != nil {
			send.RespMsgID = iResponseMessage.MessageID()
			send.RespMsgData, e = iResponseMessage.Marshal()
			if e != nil {
				a.Error("marshal promise response message fail, seq=%d, request message id=%d, response message id=%d, %s", msg.Seq, msg.ReqMsgID, iResponseMessage.MessageID(), e)
				return
			}
		}
		RootContext.Send(sender, send)

	case *S2C_Promise:
		if a.rpcProtocol == nil {
			a.Error("recv *S2C_Promise but rpc protocol is nil, seq=%d, response message id=%d", msg.Seq, msg.RespMsgID)
			return
		}

		if a.pulseInterval <= 0 {
			a.Error("recv *S2C_Promise but pulse interval <= 0, seq=%d, request message id=%d", msg.Seq, msg.RespMsgID)
			return
		}

		promise, ok := a.promises[msg.Seq]
		if !ok {
			a.Error("promise not exist, seq=%d", msg.Seq)
			return
		}

		var iResponseMessage nrpc.IMessage
		var err error
		if promise.iRequestMessage.ResponseMessageID() > 0 {
			iResponseMessage, err = a.rpcProtocol.Produce(promise.iRequestMessage.ResponseMessageID())
			if err != nil {
				err = &ErrPromiseProduceResponseMessageFail{
					Seq:               promise.seq,
					RequestMessageID:  promise.iRequestMessage.MessageID(),
					ResponseMessageID: promise.iResponseMessage.ResponseMessageID(),
					Err:               err,
				}
				promise.onComplete(err)
				return
			}

			// todo 这里无法回收对象，因为业务层会浅拷贝来自rpc包的对象，如果回收对象，会导致各种问题
			//defer a.rpcProtocol.Recycle(iResponseMessage)

			// 反序列化收到的回复包
			err = iResponseMessage.Unmarshal(msg.RespMsgData)
			if err != nil {
				err = &ErrPromiseUnmarshalResponseMessageFail{
					Seq:               promise.seq,
					RequestMessageID:  promise.iRequestMessage.MessageID(),
					ResponseMessageID: promise.iRequestMessage.ResponseMessageID(),
					Err:               err,
				}
				promise.onComplete(err)
				return
			}
		}

		if promise.onResponse(iResponseMessage) {
			delete(a.promises, promise.seq)
		}

	case nrpc.IMessage:
		if a.rpcProtocol == nil {
			a.Error("recv rpc message but rpc protocol is nil, request message id=%d", msg.MessageID())
			return
		}

		args, ok, err := a.vfBeforeDispatchRPC(sender, msg, ctx)
		if !ok {

			return
		}
		if err != nil {
			a.Error("execute vfBeforeDispatchRPC fail, sender=%s, request message=%s, %s", sender, msg, err)
			return
		}

		iResponseMessage, err := a.rpcDispatcher.Dispatch(a.rpcProtocol, sender, msg, ctx, args...)
		if err != nil {
			a.Error("dispatch rpc message fail, request message id=%d, %s", msg.MessageID(), err)
		}
		a.vfAfterDispatchRPC(err, sender, msg, iResponseMessage, ctx, args...)

	case npb.IMessage:
		if a.msgProtocol == nil {
			a.Error("recv message but protocol is nil, recv message id=%d", msg.Head().MessageID)
			return
		}

		iSend, args, ok, err := a.vfBeforeDispatchMessage(a.msgProtocol, msg, ctx)
		if !ok {
			return
		}
		if err != nil {
			a.Error("execute vfBeforeDispatchMessage fail, recv message=%s, %s", msg, err)
			return
		}

		err = a.msgDispatcher.Dispatch(msg, iSend, ctx, args...)
		if err != nil {
			a.Error("dispatch message fail, recv message id=%d, %s", msg.Head().MessageID, err)
		}

		a.vfAfterDispatchMessage(err, msg, iSend, ctx, args...)

	case *httpTaskCompleted:
		ok, err := a.vfBeforeCompleteHttpTask(msg.id, msg.tag, ctx)
		if !ok {
			return
		}
		if err != nil {
			a.Error("execute vfBeforeCompleteHttpTask fail, id=%d, tag=%d, %s", msg.id, msg.tag, err)
			return
		}

		err = a.httpTaskMgr.OnCompleted(msg.id)
		if err != nil {
			a.Error("do http task fail, %s", err)
		}

		a.vfAfterCompleteHttpTask(err, msg.id, msg.tag, ctx)

	case *httpRequest:
		if a.httpServerListenAddr == "" {
			a.Error("recv *httpRequest, but http server not run")
			return
		}

		ok, err := a.vfBeforeDispatchHttpRequest(msg.ctx)
		if !ok {
			return
		}
		if err != nil {
			a.Error("execute vfBeforeDispatchHttpRequest fail, pattern=%s, %s", msg.ctx.Request.URL.Path, err)
			return
		}

		err = a.httpServer.Dispatch(sender, msg, ctx)
		if err != nil {
			a.Error("dispatch http request fail, pattern=%s, %s", msg.ctx.Request.URL.Path, err)
		}

		a.vfAfterDispatchHttpRequest(err, msg.ctx)

	case *actor.Started:
		if !a.isRestart {
			a.pid = ctx.Self().Clone()
			a.parentPID = ctx.Parent().Clone()

			a.timerMgr = NewTimerManager(a.pid.Clone())
			a.httpTaskMgr = NewHttpTaskManager(a.pid.Clone())

			if err := a.initLogics(); err != nil {
				a.Error("init logic fail, %s", err)
				_ = a.Stop()
				return
			}

			a.runLogics()

			if a.pulseInterval > 0 {
				a.pulseTimerID = a.NewLoopTimer(a.pulseInterval, 0, a.pulse)
			}

			a.vfOnStarted(ctx)

			if a.stoppedWg != nil {
				a.stoppedWg.Add(1)
			}

			if a.startedWg != nil {
				a.startedWg.Done()
			}
		} else {
			a.vfOnRestarted(ctx)
		}

	case *actor.Stopping:
		atomic.CompareAndSwapInt32(&a.stopFlag, 0, 1)
		if a.websocketServerListenAddr != "" {
			_ = a.websocketServer.Close()
		}
		a.vfOnStopping(ctx)
		a.finishLogics()

	case *actor.Stopped:
		a.vfOnStopped(ctx)
		a.timerMgr.StopAll()
		if a.stoppedWg != nil {
			a.stoppedWg.Done()
		}

	case *actor.Restarting:
		a.Error("restarting")
		a.vfOnRestarting(ctx)
		a.isRestart = true

	case *actor.Terminated:
		a.vfOnActorTerminated(msg.Who, ctx)
		for _, logic := range a.logics {
			logic.OnActorTerminated(msg.Who.Clone(), ctx)
		}

	default:
		//defer func() {
		//	if reason := recover(); reason != nil {
		//		a.Error("crashed, reason: %#v, stack: %s", reason, ndebug.Stack())
		//		panic(reason)
		//	}
		//}()

		a.vfOnReceiveMessage(ctx)
	}
}
